云原生时代消息中间件Pulsar(介绍、集群安装部署、管理页面安装部署) 您所在的位置:网站首页 pulsar proxy 云原生时代消息中间件Pulsar(介绍、集群安装部署、管理页面安装部署)

云原生时代消息中间件Pulsar(介绍、集群安装部署、管理页面安装部署)

#云原生时代消息中间件Pulsar(介绍、集群安装部署、管理页面安装部署)| 来源: 网络整理| 查看: 265

Pulsar基本介绍

Apache Pulsar 是一个云原生企业级的发布订阅(pub-sub)消息系统,最初由Yahoo开发,并于2016年底开源,现在是Apache软件基金会顶级开源项目。Pulsar在Yahoo的生产环境运行了三年多,助力Yahoo的主要应用,如Yahoo Mail、Yahoo Finance、Yahoo Sports、Flickr、Gemini广告平台和Yahoo分布式键值存储系统Sherpa。

Apache Pulsar的功能与特性:

多租户模式:灵活的消息系统云原生架构segmented Sreams(分片流)支持跨地域复制 多租户

租户和命名空间(namespace)是 Pulsar 支持多租户的两个核心概念。

在租户级别,Pulsar 为特定的租户预留合适的存储空间、应用授权与认证机制。

在命名空间级别,Pulsar 有一系列的配置策略(policy),包括存储配额、流控、消息过期策略和命名空间之间的隔离策略。

在这里插入图片描述

灵活的消息系统

Pulsar 做了队列模型和流模型的统一,在 Topic 级别只需保存一份数据,同一份数据可多次消费。以流式、队列等方式计算不同的订阅模型大大提升了灵活度。

同时pulsar通过事务采用Exactly-Once(精准一次)在进行消息传输过程中, 可以确保数据不丢不重 在这里插入图片描述

云原生架构

Pulsar 使用计算与存储分离的云原生架构,数据从 Broker 搬离,存在共享存储内部。上层是无状态 Broker,复制消息分发和服务;下层是持久化的存储

Bookie 集群。Pulsar 存储是分片的,这种构架可以避免扩容时受限制,实现数据的独立扩展和快速恢复 在这里插入图片描述

Segmented Streams(分片流)

Pulsar 将无界的数据看作是分片的流,分片分散存储在分层存储(tiered storage)、BookKeeper 集群和 Broker 节点上,而对外提供一个统一的、无界数据的视图。其次,不需要用户显式迁移数据,减少存储成本并保持近似无限的存储。

在这里插入图片描述

支持跨地域复制

Pulsar 中的跨地域复制是将 Pulsar 中持久化的消息在多个集群间备份。在 Pulsar 2.4.0 中新增了复制订阅模式(Replicated-subscriptions) ,在某个集群失效情况下,该功能可以在其他集群恢复消费者的消费状态,从而达到热备模式下消息服务的高可用。 在这里插入图片描述

组件介绍 层级存储 Infinite Stream: 以流的方式永久保存原始数据分区的容量不再受限制充分利⽤云存储或现有的廉价存储 ( 例如 HDFS)数据统⼀表征:客户端无需关心数据究竟存储在哪⾥

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ORjke9Zk-1649237152127)( image-20220405170809598.png)]

Pulsar IO(Connector) 连接器 Pulsar IO 分为输入(Input)和输出(Output)两个模块,输入代表数据从哪里来,通过 Source 实现数据输入。输出代表数据要往哪里去,通过 Sink 实现数据输出。Pulsar 提出了 Connector (也称为 Pulsar IO),用于解决 Pulsar 与周边系统的集成问题,帮助用户高效完成工作。目前 pulsar IO 支持非常多的连接集成操作: 例如HDFS 、Spark、Flink 、Flume 、ES 、HBase等

在这里插入图片描述

Pulsar Funcations(轻量级计算框架) Pulsar Functions 是一个轻量级的计算框架,可以给用户提供一个部署简单、运维简单、API 简单的 FASS(Function as a service)平台。Pulsar Functions 提供基于事件的服务,支持有状态与无状态的多语言计算,是对复杂的大数据处理框架的有力补充。Pulsar Functions 的设计灵感来自于 Heron 这样的流处理引擎,Pulsar Functions 将会拓展 Pulsar 和整个消息领域的未来。使用 Pulsar Functions,用户可以轻松地部署和管理 function,通过 function 从 Pulsar topic 读取数据或者生产新数据到 Pulsar topic。

在这里插入图片描述

Pulsar与kafka的对比 模型概念

Kafka: producer – topic – consumer group – consumer ​ Pulsar: producer – topic -subsciption- consumer

消息消费模式

Kafka: 主要集中在流(Stream) 模式, 对单个partition是独占消费, 没有共享(Queue)的消费模式 ​ Pulsar: 提供了统一的消息模型和API. 流(Stream) 模式 – 独占和故障切换订阅方式 ; 队列(Queue)模式 – 共享订阅的方式

消息确认(ack)

Kafka: 使用偏移量 offset ​ Pulsar: 使用专门的cursor管理. 累积确认和kafka效果一样; 提供单条或选择性确认

消息保留:

Kafka: 根据设置的保留期来删除消息, 有可能消息没被消费, 过期后被删除, 不支持TTL ​ Pulsar: 消息只有被所有订阅消费后才会删除, 不会丢失数据,. 也运行设置保留期, 保留被消费的数据 . 支持TTL

存储方式

Apache Kafka和Apache Pulsar都有类似的消息概念。 客户端通过主题与消息系统进行交互。 每个主题都可以分为多个分区。 然而,Apache Pulsar和Apache Kafka之间的根本区别在于Apache Kafka是以分区为存储中心,而Apache Pulsar是以Segment为存储中心。

在这里插入图片描述

Apache Pulsar将高性能的流(Apache Kafka所追求的)和灵活的传统队列(RabbitMQ所追求的)结合到一个统一的消息模型和API中。 Pulsar使用统一的API为用户提供一个支持流和队列的系统,且具有同样的高性能。

性能对比

Pulsar 表现最出色的就是性能,Pulsar 的速度比 Kafka 快得多,美国德克萨斯州一家名为 GigaOm (https://gigaom.com/) 的技术研究和分析公司对 Kafka 和 Pulsar 的性能做了比较,并证实了这一点。 在这里插入图片描述

kafka目前存在的痛点

扩展说明: kafka目前存在的痛点

Kafka 很难进行扩展,因为 Kafka 把消息持久化在 broker 中,迁移主题分区时,需要把分区的数据完全复制到其他 broker 中,这个操作非常耗时。当需要通过更改分区大小以获得更多的存储空间时,会与消息索引产生冲突,打乱消息顺序。因此,如果用户需要保证消息的顺序,Kafka 就变得非常棘手了。如果分区副本不处于 ISR(同步)状态,那么 leader 选取可能会紊乱。一般地,当原始主分区出现故障时,应该有一个 ISR 副本被征用,但是这点并不能完全保证。若在设置中并未规定只有 ISR 副本可被选为 leader 时,选出一个处于非同步状态的副本做 leader,这比没有 broker 服务该 partition 的情况更糟糕。使用 Kafka 时,你需要根据现有的情况并充分考虑未来的增量计划,规划 broker、主题、分区和副本的数量,才能避免 Kafka 扩展导致的问题。这是理想状况,实际情况很难规划,不可避免会出现扩展需求。Kafka 集群的分区再均衡会影响相关生产者和消费者的性能。发生故障时,Kafka 主题无法保证消息的完整性(特别是遇到第 3 点中的情况,需要扩展时极有可能丢失消息)。使用 Kafka 需要和 offset 打交道,这点让人很头痛,因为 broker 并不维护 consumer 的消费状态。如果使用率很高,则必须尽快删除旧消息,否则就会出现磁盘空间不够用的问题。众所周知,Kafka 原生的跨地域复制机制(MirrorMaker)有问题,即使只在两个数据中心也无法正常使用跨地域复制。因此,甚至 Uber 都不得不创建另一套解决方案来解决这个问题,并将其称为 uReplicator (https://eng.uber.com/ureplicator/)。要想进行实时数据分析,就不得不选用第三方工具,如 Apache Storm、Apache Heron 或 Apache Spark。同时,你需要确保这些第三方工具足以支撑传入的流量。Kafka 没有原生的多租户功能来实现租户的完全隔离,它是通过使用主题授权等安全功能来完成的。 Pulsar架构介绍

单个 Pulsar 集群由以下三部分组成:

多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。多个 bookie 的 BookKeeper 集群负责消息的持久化存储。一个zookeeper集群,用来处理多个Pulsar集群之间的协调任务。 在这里插入图片描述 Brokers介绍

Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:

一个 HTTP 服务器, 它暴露了 REST 系统管理接口以及在生产者和消费者之间进行 Topic查找的API。一个调度分发器, 它是异步的TCP服务器,通过自定义 二进制协议应用于所有相关的数据传输。

出于性能考虑,消息通常从Managed Ledger缓存中分派出去,除非积压超过缓存大小。如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。

最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重新发布到其他区域

Zookeeper的元数据存储

Pulsar使用Apache Zookeeper进行元数据存储、集群配置和协调

配置存储: 存储租户,命名域和其他需要全局一致的配置项每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的)等等。 基于bookKeeper持久化存储

Apache Pulsar 为应用程序提供有保证的信息传递, 如果消息成功到达broker, 就认为其预期到达了目的地。

为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达。这种消息传递模式通常称为持久消息传递. 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储。

Pulsar用 Apache bookKeeper作为持久化存储。 bookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:

使pulsar能够利用独立的日志,称为ledgers. 可以随着时间的推移为topic创建多个Ledgers它为处理顺序消息提供了非常有效的存储保证了多系统挂掉时Ledgers的读取一致性提供不同的Bookies之间均匀的IO分布的特性它在容量和吞吐量方面都具有水平伸缩性。能够通过增加bookies立即增加容量到集群中,并提升吞吐量Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备 (一个用于日志,另一个用于一般存储) ,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XC09Noll-1649237152128)( image-20220405172507575.png)]

Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个bookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:

Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,ledger只会以只读模式打开。最后,当ledger中的条目不再有用的时候,整个ledger可以被删除(ledger分布是跨Bookies的)。 Pulsar 代理

Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers。然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者Kubernetes以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。

Pulsar proxy 为这个问题提供了一个解决方案, 为所有的broker提供了一个网关, 如果选择运行了Pulsar Proxy. 所有的客户都会通过这个代理而不是直接与brokers通信

Pulsar分布式集群安装 搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身实例)。这三个集群组件如下:

ZooKeeper 集群(3 个 ZooKeeper 节点组成)

bookie 集群(也称为 BookKeeper 集群,3 个 BookKeeper 节点组成)

broker 集群(3 个 Pulsar 节点组成)

Pulsar 的安装包已包含了搭建集群所需的各个组件库。无需单独下载 ZooKeeper 安装包和 BookKeeper 安装包。(此处zookeeper使用外置的) 节点ipmaster172.31.1.100slave1172.31.1.101slave2172.31.1.102 jdk安装部署

下载网址:https://www.oracle.com/java/technologies/downloads/#java8

上传至主节点之后进行解压,然后配置环境变量

vim /root/.bash_profile

添加(注意改为自己解压出的地址)

export JAVA_HOME=/opt/module/jdk1.8.0_212 export PATH=$PATH:$JAVA_HOME/bin zookeeper部署

下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

解压到指定目录,修改文件夹名称 tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/ cd /opt/module mv apache-zookeeper-3.5.7-bin/ zookeeper 增加myid文件 cd /opt/module/zookeeper/ mkdir zkData cd zkData vim myid

写入0,保存退出

修改配置文件 cd /opt/module/zookeeper/conf mv zoo_sample.cfg zoo.cfg vim zoo.cfg

增加以下配置(如果已经有的就进行修改、这里要配置8887端口,默认8080和pulsar有冲突)

dataDir=/opt/module/zookeeper/zkData admin.serverPort=8887 server.1=master:2888:3888 server.2=slave1:2888:3888 server.3=slave2:2888:3888 分发至slave1、slave2节点 scp -r /opt/module/zookeeper root@slave1:/opt/module/ scp -r /opt/module/zookeeper root@slave2:/opt/module/

复制完成后到slave1和slave2上修改myid 分别为 2 3

到三台机器上进行启动 cd /opt/module/zookeeper bin/zkServer.sh start

6.查看状态、确保zk没有问题在往下

bin/zkServer.sh status

在这里插入图片描述

Pulsar部署

下载地址:https://pulsar.apache.org/zh-CN/download/

选择对应版本下载就可以了,这里我选用的2.8.1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vA9J9Foi-1649237152129)( image-20220405175016449.png)]

在 Linux 服务器上创建二个文件夹: brokers、bookies cd /export/server/ mkdir -p brokers mkdir -p bookies 将下载的pulsar安装包上传到linux服务器, 并解压,没有的目录自己创建一下 cd /export/software/ tar -zxf apache-pulsar-2.8.1-bin.tar.gz cd apache-pulsar-2.8.1 cp -r * /export/server/brokers/ cp -r * /export/server/bookies/ 修改bookkeeper集群的配置文件 cd /export/server/bookies/conf vim bookkeeper.conf 修改其第56行:修改本地ip地址 advertisedAddress=172.31.1.100 修改其39行: journalDirectory=/export/server/bookies/tmp/journal 修改其389行: ledgerDirectories=/export/server/bookies/tmp/ledger 修改617行: zkServers=172.31.1.100:2181,172.31.1.101:2181,172.31.1.102:2181 修改broker集群的配置文件 cd /export/server/brokers/conf/ vim broker.conf 修改第98行: 修改集群的名称 clusterName=pulsar-cluster 修改第23行: 配置zookeeper地址 zookeeperServers=172.31.1.100:2181,172.31.1.101:2181,172.31.1.102:2181 修改第26行: 配置zookeeper地址 configurationStoreServers=172.31.1.100:2181,172.31.1.101:2181,172.31.1.102:2181 修改第44行: 更改为本地ip地址 advertisedAddress=172.31.1.100

5.将配置好bookies目录和brokers目录发送到第二台和第三台

cd /export/server scp -r bookies/ brokers/ root@slave1:/export/server scp -r bookies/ brokers/ root@slave2:/export/server 修改第二台和第三台的broker的地址和bookies地址 第二台节点: cd /export/server/bookies/conf vim bookkeeper.conf 修改其第56行:修改本地ip地址 advertisedAddress=172.31.1.101 cd /export/server/brokers/conf/ vim broker.conf 修改第44行: 更改为本地ip地址 advertisedAddress=172.31.1.101 第三台节点: 都更改为对应IP地址即可 初始化元数据、主节点执行 cd /export/server/brokers/ bin/pulsar initialize-cluster-metadata \ --cluster pulsar-cluster \ --zookeeper 172.31.1.100:2181,172.31.1.101:2181,172.31.1.102:2181 \ --configuration-store 172.31.1.100:2181,172.31.1.101:2181,172.31.1.102:2181 \ --web-service-url http://172.31.1.100:8080,172.31.1.101:8080,172.31.1.102:8080 \ --web-service-url-tls https://172.31.1.100:8443,172.31.1.101:8443,172.31.1.102:8443 \ --broker-service-url pulsar://172.31.1.100:6650,172.31.1.101:6650,172.31.1.102:6650 \ --broker-service-url-tls pulsar+ssl://172.31.1.100:6651,172.31.1.101:6651,172.31.1.102:6651 启动bookie集群 先执行初始化(只需要在一台节点执行即可): 若出现提示,输入 Y,继续 cd /export/server/bookies bin/bookkeeper shell metaformat 依次在三台节点执行启动: cd /export/server/bookies bin/pulsar-daemon start bookie 验证是否启动: 可三台都检测 bin/bookkeeper shell bookiesanity 提示: Bookie sanity test succeeded 认为启动成功 启动broker 三台节点都需要执行: cd /export/server/brokers bin/pulsar-daemon start broker 检测是否启动: bin/pulsar-admin brokers list pulsar-cluster 测试集群 # 进入 brokers 目录,选取任一个 broker 节点执行命令即可 cd /export/server/brokers # 创建租户(租户名:my-tenant) ./bin/pulsar-admin tenants create my-tenant # 创建命名空间(命名空间名:my-tenant/my-namespace,它指定了租户 my-tenant) ./bin/pulsar-admin namespaces create my-tenant/my-namespace # 创建持久性分区topic(topic全名:persistent://my-tenant/my-namespace/my-topic;分区数为 3) ./bin/pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic -p 3 # 更新命名空间为其指定集群名 ./bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters pulsar-cluster # 构建消费者: ./bin/pulsar-client consume persistent://public/default/test -s "consumer-test" # 构建生产者 ./bin/pulsar-client produce persistent://public/default/test --messages "hello-pulsar"

如果消费者可以正常接收到消息就代表安装成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VvlJ8piT-1649237152129)(image-20220405222306864.png)]

Pulsar admin manger图形界面安装

Pulsar admin manger是基于Plusar的可视化界面,接下来进行安装

下载 pulsar-admin cd /export/software wget https://dist.apache.org/repos/dist/release/pulsar/pulsar-manager/pulsar-manager-0.2.0/apache-pulsar-manager-0.2.0-bin.tar.gz 解压压缩包 tar -zxf apache-pulsar-manager-0.2.0-bin.tar.gz -C /export/server/ cd /export/server/pulsar-manager 接着再次解压目录下的tar包 tar -xvf pulsar-manager.tar 拷贝dist包到 pulsar-manager目录下并更名为ui cd /export/server/pulsar-manager/pulsar-manager cp -r ../dist ui 启动pulsar(这里默认启动是java -jar的方式,所以不要退出进程) ./bin/pulsar-manager 初始化超级用户密码 : CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token) curl -H "X-XSRF-TOKEN: $CSRF_TOKEN" -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" -H 'Content-Type: application/json' -X PUT http://localhost:7750/pulsar-manager/users/superuser -d '{"name": "pulsar", "password": "pulsar", "description": "test", "email": "[email protected]"}'

用户名为pulsar 密码为pulsar

访问Puslar UI

http://172.31.1.100:7750/ui/index.html

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UqAbXEuB-1649237152129)(image-20220405181736442.png)]

点击 new Environment, 设置集群环境即可

在这里插入图片描述

点击pulsar_cluster进入管理界面

在这里插入图片描述

如果想后台执行的话修改 vim /export/server/pulsar-manager/pulsar-manager/bin/pulsar-manager

加上nohup

启动命令为./bin/pulsar-manager &

#

声明

大部分来源于黑马:https://space.bilibili.com/415938397/search/video?keyword=pulsar



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有